package io.sundial.scheduler.impl;

import io.sundial.coordination.CallbackException;
import io.sundial.coordination.Coordinator;
import io.sundial.coordination.CoordinatorException;
import io.sundial.coordination.node.Node;
import io.sundial.coordination.tree.TreeEvent;
import io.sundial.coordination.tree.TreeWatcher;
import io.sundial.coordination.watching.EventWatching;
import io.sundial.core.Callback;
import io.sundial.core.context.Context;
import io.sundial.core.lifecycle.exception.DestroyingException;
import io.sundial.core.lifecycle.exception.InitializingException;
import io.sundial.engine.exception.ShuttingException;
import io.sundial.engine.exception.StartingException;
import io.sundial.job.JobCommand;
import io.sundial.job.JobSharding;
import io.sundial.scheduler.exception.SchedulingException;
import io.sundial.task.*;

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 抽象的被动式调度器
 *
 * @author Payne 646742615@qq.com
 * 2018/12/21 10:44
 */
public abstract class DisposingScheduler extends BalancingScheduler {
    private final ConcurrentMap<String, Information> executors = new ConcurrentHashMap<>();
    private final OnCommandCreated onCommandCreated = new OnCommandCreated();
    private EventWatching engineTreeWatching;

    protected SchedulingRepository schedulingRepository;
    protected TriggeringRepository triggeringRepository;

    @Override
    protected void initializing(Context context) throws InitializingException {
        super.initializing(context);

        // region 绑定调度记录仓储
        schedulingRepository = schedulingRepository != null
                ? schedulingRepository
                : context.get(SchedulingRepository.class);
        schedulingRepository.initialize(context);
        // endregion

        // region 绑定触发记录仓储
        triggeringRepository = triggeringRepository != null
                ? triggeringRepository
                : context.get(TriggeringRepository.class);
        triggeringRepository.initialize(context);
        // endregion
    }

    @Override
    protected void destroying() throws DestroyingException {
        super.destroying();

        schedulingRepository.destroy();
        schedulingRepository = null;

        triggeringRepository.destroy();
        triggeringRepository = null;
    }

    @Override
    protected void starting() throws StartingException {
        super.starting();

        //region 观察目前可用的执行节点列表
        try {
            engineTreeWatching = coordinator.watch(ENGINES_ROOT, new EngineTreeWatcher());
        } catch (CoordinatorException e) {
            throw new StartingException(e);
        }
        //endregion
    }

    @Override
    protected void shutting() throws ShuttingException {
        super.shutting();

        try {
            engineTreeWatching.close();
        } catch (CoordinatorException e) {
            throw new ShuttingException(e);
        }
    }

    @Override
    public void schedule(Task task) throws SchedulingException {
        schedule(task, System.currentTimeMillis());
    }

    @Override
    public void schedule(Task task, long time) throws SchedulingException {
        try {
            // region 保存调度记录
            Scheduling scheduling = new Scheduling(task, time, name);
            {
                Date now = new Date();
                scheduling.setDateCreated(now);
                scheduling.setLastUpdated(now);
                scheduling.setVersion(0);
            }
            Scheduling.Key schedulingKey = new Scheduling.Key(task.getName(), task.getGroup(), time);
            schedulingRepository.save(schedulingKey, scheduling);
            // endregion

            Map<String, List<JobSharding>> map = balance(executors, task);
            for (Map.Entry<String, List<JobSharding>> entry : map.entrySet()) {
                String executor = entry.getKey();
                String root = COMMANDS_ROOT + SPRT + executor;

                List<JobSharding> shardings = entry.getValue();
                for (JobSharding sharding : shardings) {
                    String path = root + SPRT + task.getGroup() + SPRT + task.getName() + SPRT + time + SPRT + sharding.getIndex();
                    JobCommand command = new JobCommand();
                    command.setJobPath(path);
                    command.setJobSharding(sharding);
                    command.setJobTime(time);
                    command.setJobArguments(task.getJobArguments());
                    byte[] data = marshall(command);

                    //region 保存触发记录
                    Triggering triggering = new Triggering(scheduling, sharding.getIndex(), sharding.getValue(), name, executor);
                    Date now = new Date();
                    triggering.setDateCreated(now);
                    triggering.setLastUpdated(now);
                    triggering.setVersion(0);
                    Triggering.Key triggeringKey = new Triggering.Key(task.getName(), task.getGroup(), time, sharding.getIndex());
                    triggeringRepository.save(triggeringKey, triggering);
                    //endregion

                    coordinator.create(path, data, true, false, onCommandCreated, triggering);
                }
            }
        } catch (Exception e) {
            throw new SchedulingException(e);
        }
    }

    /**
     * 工作引擎节点树观察者
     */
    private class EngineTreeWatcher implements TreeWatcher {
        @Override
        public void onWatched(TreeEvent event) throws Exception {
            TreeEvent.Type type = event.getType();
            switch (type) {
                case NODE_CREATED:
                case NODE_UPDATED: {
                    Node node = event.getNode();
                    String path = node.getPath();
                    if (!path.startsWith(ENGINES_ROOT + SPRT)) return;
                    String name = path.substring((ENGINES_ROOT + SPRT).length());
                    if (name.contains(SPRT)) return;

                    byte[] data = node.getData();
                    Information information = unmarshal(data, Information.class);

                    executors.put(name, information != null ? information : new Information());
                }
                break;
                case NODE_REMOVED: {
                    Node node = event.getNode();
                    String path = node.getPath();
                    if (!path.startsWith(ENGINES_ROOT + SPRT)) return;
                    String name = path.substring((ENGINES_ROOT + SPRT).length());
                    if (name.contains(SPRT)) return;

                    executors.remove(name);
                }
                break;
                case CONN_SUSPENDED:
                    break;
                case CONN_RECONNECTED:
                    break;
                case CONN_LOST:
                    break;
                case TREE_INITIALIZED:
                    break;
            }
        }
    }

    /**
     * 作业命令创建完成回调
     */
    private class OnCommandCreated implements Callback<Coordinator.CreateResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.CreateResult result, CallbackException exception) throws Exception {

        }
    }

    public SchedulingRepository getSchedulingRepository() {
        return schedulingRepository;
    }

    public void setSchedulingRepository(SchedulingRepository schedulingRepository) {
        this.schedulingRepository = schedulingRepository;
    }

    public TriggeringRepository getTriggeringRepository() {
        return triggeringRepository;
    }

    public void setTriggeringRepository(TriggeringRepository triggeringRepository) {
        this.triggeringRepository = triggeringRepository;
    }
}
